Skip to content

[FLINK-39583][table-planner] Normalize Calcite correl variables for more effective sub-plan digest reuse#27959

Open
ferenc-csaky wants to merge 1 commit into
apache:masterfrom
ferenc-csaky:normalize-calcite-cor-vars
Open

[FLINK-39583][table-planner] Normalize Calcite correl variables for more effective sub-plan digest reuse#27959
ferenc-csaky wants to merge 1 commit into
apache:masterfrom
ferenc-csaky:normalize-calcite-cor-vars

Conversation

@ferenc-csaky
Copy link
Copy Markdown
Contributor

@ferenc-csaky ferenc-csaky commented Apr 17, 2026

What is the purpose of the change

Improve digest-based subplan reuse (when table.optimizer.reuse-optimize-block-with-digest-enabled is set to true) for plans containing correlation variables, such as CROSS JOIN UNNEST and decorrelated subqueries.

Structurally equivalent subplans will receive different Calcite CorrelationIds during separate view expansions, cause Calcite has global counter to name correlation variables ($cor2, etc). These ids become part of the relational digest, so otherwise identical subplans do not match and cannot be considered for reuse.

The change introduces correlation-variable normalization before digest-based reuse, assigning deterministic sequential correlation ids per subplan so equivalent correlated plans produce matching digests.

Brief change log

  • Introduce CorrelVariableNormalizerShuttle to adjust deterministic sequential correl ids
  • Wire in the normalizer shuttle before the SubplanReuseShuttle in RelNodeBlock
  • Add new test case into SubplanReuseTest that covers the added logic

Verifying this change

Added SubplanReuseTest#testSubplanReuseOnTemporalJoinWithUnnest which produces a proper reused exec plan in case the block digest optimization is enabled.

Before correl normalization, the exec plan would look like:

WatermarkAssigner(rowtime=[ts], watermark=[ts])(reuse_id=[1])
+- TableSourceScan(table=[[default_catalog, default_database, Probe]], fields=[k, arr, ts])

Exchange(distribution=[hash[k]])(reuse_id=[2])
+- Deduplicate(keep=[LastRow], key=[k], order=[ROWTIME], outputInsertOnly=[false])
   +- Exchange(distribution=[hash[k]])
      +- WatermarkAssigner(rowtime=[ts], watermark=[ts])
         +- TableSourceScan(table=[[default_catalog, default_database, Versioned]], fields=[k, v, ts])

Sink(table=[default_catalog.default_database.Sink1], fields=[k, v])
+- Calc(select=[k, v])
   +- TemporalJoin(joinType=[InnerJoin], where=[((k = k0) AND __TEMPORAL_JOIN_CONDITION(ts, ts0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(k0), __TEMPORAL_JOIN_LEFT_KEY(k), __TEMPORAL_JOIN_RIGHT_KEY(k0)))], select=[k, ts, k0, v, ts0])
      :- Exchange(distribution=[hash[k]])
      :  +- Calc(select=[k, ts])
      :     +- Correlate(invocation=[$UNNEST_ROWS$1($cor6.arr)], correlate=[table($UNNEST_ROWS$1($cor6.arr))], select=[k,arr,ts,x], rowType=[RecordType(BIGINT k, RecordType:peek_no_expand(BIGINT x) ARRAY arr, TIMESTAMP_LTZ(3) *ROWTIME* ts, BIGINT x)], joinType=[INNER])
      :        +- Reused(reference_id=[1])
      +- Reused(reference_id=[2])

Sink(table=[default_catalog.default_database.Sink2], fields=[x, v])
+- Calc(select=[x, v])
   +- TemporalJoin(joinType=[InnerJoin], where=[((k = k0) AND __TEMPORAL_JOIN_CONDITION(ts, ts0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(k0), __TEMPORAL_JOIN_LEFT_KEY(k), __TEMPORAL_JOIN_RIGHT_KEY(k0)))], select=[k, ts, x, k0, v, ts0])
      :- Exchange(distribution=[hash[k]])
      :  +- Calc(select=[k, ts, x])
      :     +- Correlate(invocation=[$UNNEST_ROWS$1($cor8.arr)], correlate=[table($UNNEST_ROWS$1($cor8.arr))], select=[k,arr,ts,x], rowType=[RecordType(BIGINT k, RecordType:peek_no_expand(BIGINT x) ARRAY arr, TIMESTAMP_LTZ(3) *ROWTIME* ts, BIGINT x)], joinType=[INNER])
      :        +- Reused(reference_id=[1])
      +- Reused(reference_id=[2])

Sink(table=[default_catalog.default_database.Sink3], fields=[k, EXPR$1, x, v, EXPR$4])
+- Calc(select=[k, CAST(ts AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS EXPR$1, x, v, CAST(ts0 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS EXPR$4])
   +- TemporalJoin(joinType=[InnerJoin], where=[((k = k0) AND __TEMPORAL_JOIN_CONDITION(ts, ts0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(k0), __TEMPORAL_JOIN_LEFT_KEY(k), __TEMPORAL_JOIN_RIGHT_KEY(k0)))], select=[k, ts, x, k0, v, ts0])
      :- Exchange(distribution=[hash[k]])
      :  +- Calc(select=[k, ts, x])
      :     +- Correlate(invocation=[$UNNEST_ROWS$1($cor10.arr)], correlate=[table($UNNEST_ROWS$1($cor10.arr))], select=[k,arr,ts,x], rowType=[RecordType(BIGINT k, RecordType:peek_no_expand(BIGINT x) ARRAY arr, TIMESTAMP_LTZ(3) *ROWTIME* ts, BIGINT x)], joinType=[INNER])
      :        +- Reused(reference_id=[1])
      +- Reused(reference_id=[2])

Sink(table=[default_catalog.default_database.Sink4], fields=[k, EXPR$1, x, v, EXPR$4])
+- Calc(select=[k, CAST(ts AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS EXPR$1, x, v, CAST(ts0 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS EXPR$4])
   +- TemporalJoin(joinType=[InnerJoin], where=[((k = k0) AND __TEMPORAL_JOIN_CONDITION(ts, ts0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(k0), __TEMPORAL_JOIN_LEFT_KEY(k), __TEMPORAL_JOIN_RIGHT_KEY(k0)))], select=[k, ts, x, k0, v, ts0])
      :- Exchange(distribution=[hash[k]])
      :  +- Calc(select=[k, ts, x])
      :     +- Correlate(invocation=[$UNNEST_ROWS$1($cor12.arr)], correlate=[table($UNNEST_ROWS$1($cor12.arr))], select=[k,arr,ts,x], rowType=[RecordType(BIGINT k, RecordType:peek_no_expand(BIGINT x) ARRAY arr, TIMESTAMP_LTZ(3) *ROWTIME* ts, BIGINT x)], joinType=[INNER])
      :        +- Reused(reference_id=[1])
      +- Reused(reference_id=[2])

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Apr 17, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@ferenc-csaky ferenc-csaky force-pushed the normalize-calcite-cor-vars branch 2 times, most recently from 7ec8e24 to eb95a68 Compare April 22, 2026 15:46
@ferenc-csaky ferenc-csaky force-pushed the normalize-calcite-cor-vars branch from eb95a68 to b803d47 Compare April 30, 2026 14:00
@ferenc-csaky ferenc-csaky marked this pull request as ready for review April 30, 2026 14:01
@ferenc-csaky ferenc-csaky changed the title [hotfix] Normalize Calcite correlation variables [FLINK-39583][table-planner] Normalize Calcite correl variables for more effective sub-plan digest reuse Apr 30, 2026
@ferenc-csaky ferenc-csaky force-pushed the normalize-calcite-cor-vars branch from b803d47 to 88c17e2 Compare April 30, 2026 14:19
}

@Override
public RexNode visitSubQuery(RexSubQuery subQuery) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this should also be covered by the tests. Right now they don't test sub-queries for correl variable normalization.

Copy link
Copy Markdown
Contributor Author

@ferenc-csaky ferenc-csaky May 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added unit tests to cover this. An integration test e.g. in SubplanReuseTest won't hit this code, because where this normalize is applied in the optimization chain subqueries are actually already rewritten/decorrelated, so it won't make a difference what happens in the normalizer.

The reason it's still useful to have this cause from the scope of the class itself this is how it should behave if it gets a RelNode without context about what kind of optimizations that node went through already.

@github-actions github-actions Bot added the community-reviewed PR has been reviewed by the community. label May 3, 2026
@ferenc-csaky ferenc-csaky force-pushed the normalize-calcite-cor-vars branch from 88c17e2 to 005e62f Compare May 8, 2026 17:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants